package com.xiaojie.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Description:kafka消费者
 * @author: xiaojie
 * @date: 2021.10.14
 */

@Component
@Slf4j
public class KafkaConsumer {

//    @KafkaListener(groupId = "xiaojie_group",topics = {"xiaojie-topic"})
//    public void onMessage(ConsumerRecord<?, ?> record) {
//        log.info("消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}",
//                record.topic(), record.partition(), record.offset(), record.value());
//    }

    /*
     *
     * @param message
     * @param ack
     * @手动提交ack
     * containerFactory  手动提交消息ack
     * errorHandler 消费端异常处理器
     * @author xiaojie
     * @date 2021/10/14
     * @return void
     */
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
        for (int i=0;i<record.size();i++){
            System.out.println(record.get(i).value());
        }
        ack.acknowledge();//直接提交offset
    }

    /**
     * @description: id：消费者ID；
     * groupId：消费组ID；
     * topics：监听的topic，可监听多个； topics不能和topicPartitions同时使用
     * topicPartitions：可配置更加详细的监听信息，可指定topic、parition、offset监听。
     * @param:
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:50
     */
    @KafkaListener(groupId = "xiaojie_group",topicPartitions = {
            @TopicPartition(topic = "test-topic", partitions = {"1"}),
            @TopicPartition(topic = "xiaojie-test-topic", partitions = {"1"},
                    partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "15"))
    })
    public void onMessage1(ConsumerRecord<?, ?> record) {
        //指定消费某个topic,的某个分区，指定消费位置
        //执行消费xiaojie-test-topic的1号分区，和xiaojie-test-topic的1和2号分区，并且2号分区从15开始消费
        log.info("消费主题>>>>>>:{},消费分区>>>>>>>>:{},消费偏移量>>>>>:{},消息内容>>>>>:{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }

    /**
     * @description: 批量消费消息
     * @param:
     * @param: records
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:52
     */
//    @KafkaListener(topics = "xiaojie-topic")
//    public void batchOnMessage(List<ConsumerRecord<?, ?>> records) {
//        for (ConsumerRecord<?, ?> record : records) {
//            log.info("批量消费消息>>>>>>>>>>>>>>>>>{}", record.value());
//        }
//    }


    //在实际开发中，我们可能有这样的需求，应用A从TopicA获取到消息，
    // 经过处理后转发到TopicB，再由应用B监听处理消息，即一个应用处理完成后将该消息转发至其他应用，完成消息的转发。
//    @KafkaListener(topics = {"my-topic"})
//    @SendTo("xiaojie-topic")
//    public String forwardMSg(ConsumerRecord<?, ?> record) {
//        return record.value() + "消费完之后转发到第二个topic";
//    }
    @KafkaListener(topics = {"callback-topic"})
    public void callBackOnMessage(ConsumerRecord<?,?> record){
        System.out.println("回调方式生产者发送的消息是："+record.value());
    }
    /**
     * @description: 事务提交的方式消费者
     * @param:
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/15 22:20
     */
    @KafkaListener(topics = {"tx-topic"})
    public void txOnmessage(ConsumerRecord<?,?> record){
        System.out.println("事务的方式消费者消费接收到的消息："+record.value());
    }

    /**
     * @description: 消费者过滤器
     * @param:
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/16 1:04
     */
    @KafkaListener(topics = "filter-topic",containerFactory = "filterFactory")
    public void filterOnmessage(ConsumerRecord<?,?> record){
        log.info("消费到的消息是：》》》》》》》》》》》{}",record.value());
    }




    // 模拟不同的分组可以消费同一条消息start。。。。。。。。。。。。。。。。。。
    @KafkaListener(groupId = "xiaojie_group-1",topics = {"snail-test-topic"})
    public void onMessage(ConsumerRecord<?, ?> record,Acknowledgment ack) {
        log.info("分组是xiaojie_group-1>>>>>>>>>>>>>>>>>>>:消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}",
                record.topic(), record.partition(), record.offset(), record.value());
        ack.acknowledge();
    }


    @KafkaListener(groupId = "xiaojie_group-2",topics = {"snail-test-topic"})
    public void onMessage3(ConsumerRecord<?, ?> record,Acknowledgment ack) {
        log.info("分组是xiaojie_group-2>>>>>>>>>>>>>>>>>>>:消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}",
                record.topic(), record.partition(), record.offset(), record.value());
        ack.acknowledge();
    }
    // 模拟不同的分组可以消费同一条消息end。。。。。。。。。。。。。。。。。。

}
